/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.shardingsphere.elasticjob.lite.internal.listener;

import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.lite.internal.config.RescheduleListenerManager;
import org.apache.shardingsphere.elasticjob.lite.internal.election.ElectionListenerManager;
import org.apache.shardingsphere.elasticjob.lite.internal.failover.FailoverListenerManager;
import org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeListenerManager;
import org.apache.shardingsphere.elasticjob.lite.internal.instance.ShutdownListenerManager;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.MonitorExecutionListenerManager;
import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingListenerManager;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.lite.internal.trigger.TriggerListenerManager;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;

import java.util.Collection;

/**
 * Listener manager facade.
 */
public final class ListenerManager {

    private final JobNodeStorage jobNodeStorage;

    private final ElectionListenerManager electionListenerManager;

    private final ShardingListenerManager shardingListenerManager;

    private final FailoverListenerManager failoverListenerManager;

    private final MonitorExecutionListenerManager monitorExecutionListenerManager;

    private final ShutdownListenerManager shutdownListenerManager;

    private final TriggerListenerManager triggerListenerManager;

    private final RescheduleListenerManager rescheduleListenerManager;

    private final GuaranteeListenerManager guaranteeListenerManager;

    private final RegistryCenterConnectionStateListener regCenterConnectionStateListener;

    public ListenerManager(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection<ElasticJobListener> elasticJobListeners) {
        jobNodeStorage = new JobNodeStorage(regCenter, jobName);
        ListenerNotifierManager.getInstance().registerJobNotifyExecutor(jobName);
        electionListenerManager = new ElectionListenerManager(regCenter, jobName);
        shardingListenerManager = new ShardingListenerManager(regCenter, jobName);
        // 失效转移监听
        failoverListenerManager = new FailoverListenerManager(regCenter, jobName);
        monitorExecutionListenerManager = new MonitorExecutionListenerManager(regCenter, jobName);
        shutdownListenerManager = new ShutdownListenerManager(regCenter, jobName);
        triggerListenerManager = new TriggerListenerManager(regCenter, jobName);
        rescheduleListenerManager = new RescheduleListenerManager(regCenter, jobName);
        guaranteeListenerManager = new GuaranteeListenerManager(regCenter, jobName, elasticJobListeners);
        regCenterConnectionStateListener = new RegistryCenterConnectionStateListener(regCenter, jobName);
    }

    /**
     * Start all listeners.
     * 会启动所有对zookeeper路径数据变化的listener。
     */
    public void startAllListeners() {
        //1.当leader节点被删除或者leader节点不存在时，触发leaderService.electLeader选举。
        //2.如果本身节点被置为不可用并且本身节点是leader，则移除本身节点的leader节点，以此来触发leaderService.electLeader选举。
        electionListenerManager.start();

        //1.配置节点中的shardingTotalCount发生改变，调用shardingService.setReshardingFlag，强制任务执行时重新分配分片。
        //2.server或者instance节点发生改变，调用shardingService.setReshardingFlag，强制任务执行时重新分配分片。
        shardingListenerManager.start();

        // 1. instance节点发生下线，将其拥有的失效转移任务再次触发失效转移到其他节点执行。
        // 2. 配置节点中的isFailover发生改变，若不需要失效转移，则直接清除现有的失效转移任务。
        failoverListenerManager.start();

        // 1.配置节点中的isMonitorExecution发生改变，若不需要提醒执行信息，则直接清除现有的显示执行的节点。
        monitorExecutionListenerManager.start();
        //本身的instance节点被删除，需要调用schedulerFacade.shutdownInstance关闭掉本身节点的定时任务。
        shutdownListenerManager.start();

        //出现指定本身节点的trigger节点，将清除trigger节点，然后调用triggerJob立刻执行一次任务。
        triggerListenerManager.start();

        //配置节点中的cron发生改变，则调用rescheduleJob重新调度定时任务
        rescheduleListenerManager.start();

        //1.start节点发生改变，将调用listener的notifyWaitingTaskStart通知任务启动。
        //2.complete发生改变，将调用listener的notifyWaitingTaskComplete通知任务完成。
        guaranteeListenerManager.start();
        //regCenterConnectionStateListener监听的是对zookeeper的连接状态，若连接出现挂起或者丢失状态，则暂停任务，
        // 如果重新连接上，则重新写入server、instance本身节点到zookeeper，清除本身节点的运行节点，然后重新调度定时任务
        jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
    }
}
